package com.dahuan.state;

import com.dahuan.bean.SensorReading;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class State_FaultTolerance {
    public static void main(String[] args) throws Exception{

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );
//        //TODO 基于事件时间
//        env.setStreamTimeCharacteristic( TimeCharacteristic.EventTime );
//        //水印之间的间隔（以毫秒为单位）
//        env.getConfig().setAutoWatermarkInterval( 100 );


        //1.状态后端设置
        env.setStateBackend( new MemoryStateBackend() );
        //hadoop集群中的hdfs某个目录作为检查点
        env.setStateBackend( new FsStateBackend( "" ) );
        env.setStateBackend( new RocksDBStateBackend(  "") );


        DataStream<String> inputStream = env.socketTextStream( "localhost", 7777 );

        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );



        dataStream.print();

        env.execute("State_FaultTolerance");
    }
}
